import numpy as npimport pandas as pdimport pymc as pmimport arviz as azimport matplotlib.pyplot as pltfrom matplotlib.patches import FancyArrowPatchfrom sklearn.preprocessing import SplineTransformerfrom scipy.integrate import trapezoidfrom dataclasses import dataclass, field, InitVarfrom typing import List, Optional, Tuplefrom pathlib import Pathimport seaborn as snsfrom dataclasses import dataclass, fieldimport matplotlib.cm as cmfrom scipy.signal import medfilt@dataclass(frozen=True)class WorkoutPhase: name: str duration_sec: int intensity: str# 'low', 'high', or 'cooldown'@dataclass(frozen=True)class WorkoutConfig: phases: List[WorkoutPhase]@propertydef total_duration(self) ->int:returnsum(p.duration_sec for p inself.phases)def iter_phases(self): current_time =0for phase inself.phases:yield phase, (current_time, current_time + phase.duration_sec) current_time += phase.duration_secdef get_windows(self, intensity: str) -> List[Tuple[int, int]]:"""Calculates (start, end) timestamps for all intervals of given intensity.""" windows = [] current_time =0for phase inself.phases:if phase.intensity == intensity: windows.append((current_time, current_time + phase.duration_sec)) current_time += phase.duration_secreturn windowsdef get_cooldown_window(self) -> Tuple[int, int]:for phase, (s, e) inself.iter_phases():if phase.intensity =='cooldown':return s, eraiseValueError("No cooldown phase defined in config.")def read_csv(file): df = pd.read_csv(file, skiprows=2).iloc[:, 1:3]if'2025-12-24'infile.stem: pad_df = df.head(110) df = pd.concat([pad_df, df]) td = pd.to_timedelta(df['Time']) df['Time_sec'] = td.dt.total_seconds()return dfdef load_preprocess_and_filter(file_paths, kernel_size=11, n_drop=0):""" 1. Loads CSVs and drops the first N data points. 2. Resets time so the first remaining point is t=0. 3. Interpolates to a common grid and applies a Median Filter. """ processed_dfs = [] max_duration =0# Pass 1: Load, Drop, and find global max timeforfilein file_paths: df = read_csv(file)# --- Constraint: Drop first N points ---if n_drop >0: df = df.iloc[n_drop:].reset_index(drop=True)# Convert Time to seconds df['Time_sec'] = pd.to_timedelta(df['Time']).dt.total_seconds()# Shift Time_sec so that the first point after dropping is t=0# This ensures all sessions align at the same relative start point df['Time_sec'] = df['Time_sec'] - df['Time_sec'].iloc[0]if df['Time_sec'].max() > max_duration: max_duration = df['Time_sec'].max() processed_dfs.append(df) common_time = np.arange(0, int(max_duration) +1, 1) data_list = []for df in processed_dfs:# Interpolate HR onto the common grid hr_interp = np.interp(common_time, df['Time_sec'], df['HR (bpm)'])# Apply Median Filtering to handle Type 1 errors (substantial jitters) hr_cleaned = medfilt(hr_interp, kernel_size=kernel_size) data_list.append(hr_cleaned)return common_time, np.array(data_list)@dataclass(frozen=True)class MedianHdi: median: float hdi: Tuple[float, float]@classmethoddef from_samples(cls, samples, hdi_prob=0.89):return cls(np.median(samples, axis=0), tuple(az.hdi(samples, hdi_prob=hdi_prob)))@propertydef hdi_width(self):returnself.hdi[1] -self.hdi[0]@propertydef hdi_lower(self):returnself.hdi[0]@propertydef hdi_upper(self):returnself.hdi[1]@dataclass(frozen=True)class MedianHdiSample: median: np.ndarray hdi: np.ndarray@classmethoddef from_samples(cls, samples, n_samples, hdi_prob=0.89):assert samples.shape[0] == n_samplesreturn cls(np.median(samples, axis=0), az.hdi(samples, hdi_prob=hdi_prob))@propertydef hdi_width(self):returnself.hdi[:, 1] -self.hdi[:, 0]@propertydef hdi_lower(self):returnself.hdi[:, 0]@propertydef hdi_upper(self):returnself.hdi[:, 1]def fit_bayesian_spline(time, hr_data, total_duration, degree, sample_size, knot_every):""" Factored model definition. Returns the basis matrix (B), its derivative (dB_dt), and the MAP weights. """# 1. Create Basis knots = np.array(range(0, total_duration+1, knot_every)).reshape((-1, 1)) transformer = SplineTransformer(knots=knots, degree=degree, include_bias=True) B = transformer.fit_transform(time.reshape(-1, 1))# 2. Compute Derivative Basis# We use gradient here to get the slope of the basis functions dB_dt = np.gradient(B, axis=0)# 3. Bayesian Model Definitionwith pm.Model() as model: w = pm.Normal("w", mu=130, sigma=40, shape=B.shape[1]) mu = pm.Deterministic("mu", pm.math.dot(B, w))# Derivative/Punch at every time step accel = pm.Deterministic("accel", pm.math.dot(w, dB_dt.T)) sigma = pm.HalfNormal("sigma", sigma=5)if hr_data isNone: pm.Normal("obs", mu=mu, sigma=sigma)else: pm.Normal("obs", mu=mu, sigma=sigma, observed=hr_data)# Sampling 1000 draws to build the HDI chains =2 trace = pm.sample(int(sample_size / chains), tune=1000, chains=chains, target_accept=0.9, progressbar=False)return trace, Bdef apply_workout_grid(ax, config: WorkoutConfig):""" Overlays the workout structure onto a heart rate plot. Shades 'high' intensity regions and labels each phase. """ y_min, y_max = ax.get_ylim() xticks = [] for phase, (start, end) in config.iter_phases():if phase.intensity =='low'or phase.intensity =='cooldown': xticks.append(start)# 1. Shade the High Intensity windowsif phase.intensity =='high': ax.axvspan(start, end, color='red', alpha=0.05, label='_nolegend_')# 2. Draw vertical transition lines ax.axvline(start, color='grey', linestyle='--', alpha=0.3, linewidth=1)# 3. Add Phase Labels at the top of the plot midpoint = start + (phase.duration_sec /2)# Only label if the phase is long enough to avoid clutterif phase.duration_sec >=60: ax.text(midpoint, y_max *0.98, phase.name, fontsize=8, ha='center', color='grey', alpha=0.7)# Mark the final boundary ax.axvline(config.total_duration, color='grey', linestyle='--', alpha=0.3, linewidth=1) ax.set_xticks(xticks + [config.total_duration])@dataclassclass TreadmillAnalytic: time: np.ndarray hr: Optional[np.ndarray] session_id: str degree: int config: WorkoutConfig# Storage for processed curves n_intervals: int= field(init=False, repr=False) trace: az.InferenceData = field(init=False, repr=False) post_mu: np.ndarray = field(init=False, repr=False) # samples x seconds post_accel: np.ndarray = field(init=False, repr=False) load_cache: InitVar[bool] =False cache_path: InitVar[Path |None] =Nonedef __post_init__(self, load_cache, cache_path):"""Coordinates the model fitting and signal generation."""self.n_intervals =len(self.config.get_windows("high")) n_samples =1000if load_cache and cache_path isnotNoneand cache_path.exists():self.trace = az.from_netcdf(cache_path)else:self.trace, _ = fit_bayesian_spline(self.time, self.hr, self.config.total_duration, self.degree, n_samples, 30)self.trace.to_netcdf(cache_path)# Extract posteriorsself.post_mu = az.extract(self.trace, var_names="mu").values.Tself.post_accel = az.extract(self.trace, var_names="accel").values.Tassertself.post_mu.shape == (n_samples, self.time.shape[0])assertself.post_accel.shape == (n_samples, self.time.shape[0])def get_acceleration_peaks(self) -> List[Tuple[WorkoutPhase, MedianHdi]]: results = []for phase, (phase_start, phase_end) inself.config.iter_phases(): win = (self.time >= phase_start) & (self.time <= phase_end)# Get the max for EVERY sample in the window (Result: 2000 max values)if phase.intensity =="high": sample_peaks =self.post_accel[:, win].max(axis=1)elif phase.intensity =="low"or phase.intensity =="cooldown": sample_peaks =self.post_accel[:, win].min(axis=1)else:raiseValueError(f"Unknown intensity: {phase.intensity}") results.append((phase, MedianHdi.from_samples(sample_peaks)))return resultsdef get_hrr60(self) -> Tuple[WorkoutPhase, MedianHdi]:for phase, (phase_start, phase_end) inself.config.iter_phases():if phase.name !="Cd":continue delta_dist =60* (self.post_mu[:, phase_start] -self.post_mu[:, phase_end]) / phase.duration_secreturn (phase, MedianHdi.from_samples(delta_dist))raiseValueError("No cooldown phase defined in config.")def get_cardiac_costs(self) -> List[Tuple[WorkoutPhase, MedianHdi]]: costs = []# Get the window for each phasefor phase, (phase_start, phase_end) inself.config.iter_phases(): win = (self.time >= phase_start) & (self.time <= phase_end)# 2. Calculate the cost for every single sample (HR is in BPM, so we divide by 60)# This gives you a distribution of 2000 'Total Beats' values cardiac_costs_dist = np.trapezoid(self.post_mu[:, win], self.time[win], axis=1) /60 costs.append((phase, MedianHdi.from_samples(cardiac_costs_dist)))return costs@staticmethoddef results_to_df(results: List[Tuple[WorkoutPhase, MedianHdi]]) -> pd.DataFrame:return pd.DataFrame([(phase.name, metric.median, metric.hdi_lower, metric.hdi_upper) for phase, metric in results], columns=["Interval", "mean", "low", "high"])@dataclassclass TreadmillReport: workout_structure: WorkoutConfig# time_axis: np.ndarray = field(init=False) sessions: List[TreadmillAnalytic] = field(init=False) accel_results: List[Tuple[WorkoutPhase, MedianHdi]] = field(init=False) hrr60_results: List[Tuple[WorkoutPhase, MedianHdi]] = field(init=False) cardiac_cost_results: List[Tuple[WorkoutPhase, MedianHdi]] = field(init=False) data_directory: InitVar[List[Path]] with_prior: booldef __post_init__(self, file_paths: List[Path]): degree =3 session_names = [p.stem[6:16] for p in file_paths] cache_paths = [file_path.with_suffix(".nc") for file_path in file_paths] time_axis, raw_data_matrix = load_preprocess_and_filter(file_paths, kernel_size=1, n_drop=240)self.sessions = [ TreadmillAnalytic(time_axis, hr_data, sn, degree, self.workout_structure, True, cp) for hr_data, sn, cp inzip(raw_data_matrix, session_names, cache_paths) ]ifself.with_prior:self.sessions.append( TreadmillAnalytic(time_axis, None, "2026-02-01_00-00-00", degree, self.workout_structure, True, Path("prior.nc")) )assertlen(file_paths) ==len(session_names)assertlen(file_paths) == raw_data_matrix.shape[0] np.any(np.isnan(raw_data_matrix))def plot_raw_data(self): fig, ax = plt.subplots(1, 1, figsize=(8, 4))for session inself.sessions:if session.hr isNone:continue ax.plot(session.time, session.hr, color='gray', linewidth=1, label=session.session_id) ax.set_title("Heart Rate Data (%d Sessions)"%len(self.sessions)) ax.set_ylabel("HR (bpm)") ax.set_xlabel("Time (seconds)") apply_workout_grid(ax, self.workout_structure) def plot_fit_accel(self, fig, axes): n_sessions =len(axes[1])for i, session inenumerate(self.sessions[-n_sessions:]): ax1, ax2 = axes[0, i], axes[1, i] hr_stats = MedianHdiSample.from_samples(session.post_mu, session.post_mu.shape[0]) acc_stats = MedianHdiSample.from_samples(session.post_accel, session.post_accel.shape[0])# --- Top Plot: Heart Rate ---if session.hr isnotNone: ax1.scatter(session.time, session.hr, color='black', s=1, alpha=0.2) ax1.plot(session.time, hr_stats.median, color='firebrick', linewidth=2.5, label='Posterior Median HR') ax1.fill_between(session.time, hr_stats.hdi_lower, hr_stats.hdi_upper, color='firebrick', alpha=0.2) ax1.set_ylabel("Heart Rate (bpm)") ax1.set_title(session.session_id) apply_workout_grid(ax1, self.workout_structure)# --- Bottom Plot: Acceleration --- ax2.plot(session.time, acc_stats.median, color='indigo', linewidth=2, label='HR Acceleration') ax2.axhline(0, color='black', linewidth=1, alpha=0.5) ax2.fill_between(session.time, acc_stats.hdi_lower, acc_stats.hdi_upper, color='firebrick', alpha=0.2) ax2.set_ylabel("Acceleration ($\\Delta$BPM/sec)", fontweight='bold') ax2.set_xlabel("Time (seconds)", fontweight='bold') apply_workout_grid(ax2, self.workout_structure)def plot_trend_vector(self, fig, ax, code):"""Plots historical sessions with a vector arrow and session name annotations.""" all_coords = []for s inself.sessions: punches = [punch for (phase, punch) in s.get_acceleration_peaks() if phase.intensity =='high'] hrr60 = s.get_hrr60()[1] x_stat = punches[0] y_stat = (hrr60 if code =='ph'else punches[-1]) all_coords.append((x_stat.median, y_stat.median, (x_stat.median - x_stat.hdi_lower, x_stat.hdi_upper - x_stat.median), (y_stat.median - y_stat.hdi_lower, y_stat.hdi_upper - y_stat.median)))for i, (session, (x_mu, y_mu, x_err, y_err)) inenumerate(zip(self.sessions, all_coords)): ax.errorbar(x_mu, y_mu, xerr=[x_err[0]], yerr=[y_err[0]], fmt='o', color='teal', alpha=0.5, capsize=0, elinewidth=1.2, markersize=4) ax.annotate(session.session_id, xy=(x_mu, y_mu), xytext=(5, 5), textcoords='offset points', fontsize=5, color='#333333')if i >0: prev_x, prev_y, _, _ = all_coords[i-1] arrow = FancyArrowPatch((prev_x, prev_y), (x_mu, y_mu), arrowstyle='-|>', mutation_scale=15, color='teal', linestyle='-', linewidth=1.5, alpha=0.3, shrinkA=5, shrinkB=5) ax.add_patch(arrow)# Formatting fig.suptitle("Longitudinal Fitness Evolution") ax.set_xlabel("1st Punch (Max Accel - BPM/s)")if code =='ph': ax.set_ylabel("Recovery (HRR60 - BPM)")else: ax.set_ylabel("Last Punch (Max Accel - BPM/s)") ax.grid(True, linestyle=':', alpha=0.4)def plot_cardiac_cost(self): cc_df = pd.concat([TreadmillAnalytic.results_to_df(s.get_cardiac_costs()) .assign(session=s.session_id) .assign(session_date=lambda x: pd.to_datetime(x.session.str[:10]))for s inself.sessions]).reset_index().rename(columns={'mean': 'cc'}) ax = sns.pointplot(x='session_date', y='cc', hue='Interval', data=cc_df, legend='brief') ax.set_ylabel("Cardiac Cost (Total Beats)") sns.move_legend(ax, "upper left", bbox_to_anchor=(1, 1))# Your current specific workout: 3 reps of (3m low, 1m high) + 2m cooldownstandard_3x4 = WorkoutConfig(phases=[ WorkoutPhase(f"R1", 180, 'low'), WorkoutPhase(f"S1", 60, 'high'), WorkoutPhase(f"R2", 180, 'low'), WorkoutPhase(f"S2", 60, 'high'), WorkoutPhase(f"R3", 180, 'low'), WorkoutPhase(f"S3", 60, 'high'), WorkoutPhase("Cd", 120, 'cooldown')])_cdw = standard_3x4.get_cooldown_window()file_paths =list(sorted(Path("polar_data").glob("*.CSV")))report = TreadmillReport(standard_3x4, file_paths, True)ign_session = report.sessions[-1]ex_session = report.sessions[-2]ex_time_point =310
If you use a heart rate monitor to monitor your exercise, it is important to wear it a few minutes after the exercise. In those minutes your body recovers and your heart rate drops until it reaches it’s normal rate. The faster the heart does that the better – a drop of \(\geq 25\) BPMs each minute is good, the higher the better.
There are other things one can measure during the exercise. For an interval training run, where you alternate between a 1 minute 8min/mile sprint (S) and a light 3 minute 12min/mile recovery (R) run, one can measure:
how fast the heart reaches it’s high rate as you start to sprint, or the punch – the faster the better
whether the first punch is as high as the last punch – if so, your endurance is good.
how fast it beats during a sprint – if it can get away with a smaller rate, it means it is more efficient
The problem is, hear rate monitors are not perfect. My polar band (a strap around the chest), is noisy so measuring these values is hard on the raw data. But one can fit a curve, and measure these values on the curve. Which exactly what we’ll do here!
Here are the data for 5 exercise sessions. I start monitoring as soon as I start running, the 2 minutes of cool down start at second 720.
Code
report.plot_raw_data()
Below are the median heart rate and acceleration from the fitted (generative) models over the last few exercise sessions. The rightmost model is the prior.
At each second the model provides a (posterior) distribution for the heart rate or its acceleration. For example, this is the distribution of the heart rate for the exercise session 2026-01-05 at two time points.
Below is also the total cardiac cost (total heart beats per interval). Recovery intervals are higher because they are longer. I want these lines to trend down.